/**
 *  Copyright 2007-2008 University Of Southern California
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *  http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing,
 *  software distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

package edu.isi.pegasus.planner.parser.dax;


import edu.isi.pegasus.planner.parser.*;
import edu.isi.pegasus.common.logging.LoggingKeys;
import edu.isi.pegasus.planner.catalog.site.classes.GridGateway;

import edu.isi.pegasus.common.logging.LogManager;

import edu.isi.pegasus.planner.classes.PCRelation;
import edu.isi.pegasus.planner.classes.PegasusFile;
import edu.isi.pegasus.planner.classes.FileTransfer;
import edu.isi.pegasus.planner.classes.Job;
import edu.isi.pegasus.planner.classes.PegasusBag;

import edu.isi.pegasus.planner.namespace.Hints;
import edu.isi.pegasus.planner.namespace.Namespace;

import edu.isi.pegasus.planner.parser.dax.Callback;

import edu.isi.pegasus.common.util.Version;

import edu.isi.pegasus.planner.catalog.replica.ReplicaCatalogEntry;
import edu.isi.pegasus.planner.catalog.transformation.TransformationCatalogEntry;
import edu.isi.pegasus.planner.catalog.transformation.classes.Arch;
import edu.isi.pegasus.planner.catalog.transformation.classes.Os;
import edu.isi.pegasus.planner.catalog.transformation.classes.TCType;
import edu.isi.pegasus.planner.catalog.transformation.classes.VDSSysInfo;
import org.xml.sax.Attributes;
import org.xml.sax.SAXException;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.StringTokenizer;
import java.util.Set;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import edu.isi.pegasus.planner.classes.DAGJob;
import edu.isi.pegasus.planner.classes.DAXJob;
import edu.isi.pegasus.planner.classes.ReplicaLocation;
import edu.isi.pegasus.planner.code.GridStartFactory;
import edu.isi.pegasus.planner.namespace.Dagman;
import edu.isi.pegasus.planner.namespace.Pegasus;

/**
 * This class parses the XML file whichis generated by Abstract Planner and ends
 * up making an ADag object which contains theinformation to make the Condor
 * submit files. The parser used to parse the file is Xerces.
 *
 * @author Karan Vahi
 * @author Gaurang Mehta
 * @version $Revision: 4559 $
 *
 * @see org.griphyn.cPlanner.classes.Job
 * @see org.griphyn.cPlanner.classes.DagInfo
 * @see org.griphyn.cPlanner.classes.ADag
 * @see org.griphyn.cPlanner.classes.PCRelation
 */

public class DAXParser2 extends Parser implements DAXParser {

    /**
     * The "not-so-official" location URL of the DAX schema definition.
     */
    public static final String SCHEMA_LOCATION =
                                 "http://pegasus.isi.edu/schema/dax-3.0.xsd";

    /**
     * URI namespace
     */
    public static final String SCHEMA_NAMESPACE =
                                           "http://pegasus.isi.edu/schema/DAX";



    /**
     * The constant designating the version when the double negative transfer
     * and registration flags were removed.
     */
    private static final String DAX_VERSION_WITHOUT_DOUBLE_NEGATIVE = "2.1";

    /**
     * The map that maps keys in execution tag to hints namespace.
     */
    private static Map<String,String> mExecutionToHintsNamespace ;
    
    /**
     * Maps the execution key to corresponding key in hints namespace.
     * 
     * @param key  the key in execution tag
     * 
     * @return corresponding key in hints namespace else null
     */
    private static String executionToHintsNamespace( String key ){
        if ( mExecutionToHintsNamespace == null ){
            mExecutionToHintsNamespace = new HashMap<String,String>();
            mExecutionToHintsNamespace.put( "site", Hints.EXECUTION_POOL_KEY );
            mExecutionToHintsNamespace.put( "executable", Hints.PFN_HINT_KEY  );
        }
        return mExecutionToHintsNamespace.get( key );
    }

    public String mDaxSchemaVersion;


    /**
     * A boolean variable set to true when we have got all the logical filenames.
     * After this all the filename tags are not added in Vector mLogicalFilesInADag
     * This is because the DAX file specifies all the input and output files
     * in the starting, and then in the job tags also the filename tags are nested.
     */
    private boolean infoAboutAllFilesRecv = false;

    /**
     * The handle to the class implementing the callback interface.
     */
    private Callback mCallback;

    /**
     * For holding the key attribute in profile tag.
     */
    private String mProfileKey = new String();

   

    /**
     * For holding the namespace if specified in the Profile Element.
     */
    private String mNamespace = new String();

    /**
     * Set as and when Profile and Argument tags are started and ended.
     * Need to in order to determine the nested filename tags which may appear in
     * these elements.
     */
    private boolean mProfileTag = false;
    private boolean mArgumentTag = false;



    /**
     * These store the current child element for the child parent relationship.
     * We get nested parent elements in a child element. Hence the child remains
     * the same while the parent id for the relationship varies.
     */
    private String mCurrentChildId = new String();

    /**
     * The list of parents of a node referred to by mCurrentChildId.
     */
    private List<PCRelation> mParents;


    /**
     * Holds information regarding the current job being parsed. It's scope can
     * be seen as the job element.
     */
    private Job mCurrentJobSubInfo = new Job();



    /**
     * All the arguments to a particular job.
     */
    private String mWholeCommandString = new String();

    /**
     * Holds the input files for a particular job making the aDag. They are Vector
     * of PegasusFile Objects which store the transiency information of each
     * logical file.
     *
     * @see org.griphyn.cPlanner.classes.PegasusFile
     */
    private Set mVJobInpFiles = new HashSet();

    /**
     * Holds the output files for a particular job making the aDag.
     * They are vector of PegasusFile Objects which store the transiency
     * information of each logical file.
     *
     * @see org.griphyn.cPlanner.classes.PegasusFile
     */
    private Set mVJobOutFiles = new HashSet();

    /**
     * A boolean indicating whether to use the double negative flags for
     * transfer and register or not.
     */
    private boolean mUseDoubleNegative;

    /**
     * The job prefix that needs to be applied to the job file basenames.
     */
    protected String mJobPrefix;

    /**
     * The uses link type for a file when uses tag is parsed.
     */
    private String mUsesLinkType;
    
    /**
     * The PegasusFile object that a uses tag corresponds to.
     */
    private PegasusFile mUsesPegasusFile;
    
    /**
     * The file attribute encountered in the dax element.
     */
    private String mDAXLFN;
    
    /**
     * The file attribute encountered in the dag element.
     */
    private String mDAGLFN;

    /**
     * The default constructor
     *
     * @param bag   the bag of objects that is useful for initialization.
     */
    public DAXParser2( PegasusBag bag ) { //default constructor
        super( bag );
        
        
        mUseDoubleNegative = false;
        mJobPrefix = ( bag.getPlannerOptions() == null ) ?
                       null:
                       bag.getPlannerOptions().getJobnamePrefix();
    }


    /**
     * Set the DAXCallback for the parser to call out to.
     *
     * @param c  the callback
     */
    public void setDAXCallback( Callback c ){
        this.mCallback = c;
    }

    /**
     * Retuns the DAXCallback for the parser
     *
     * @return  the callback
     */
    public Callback getDAXCallback(  ){
        return this.mCallback;
    }

    /**
     * This starts the parsing of the file by the parser.
     *
     * @param daxFileName    the path/uri to the XML file you want to parse.
     */
    public void startParser(String daxFileName) {
        try{
            this.testForFile(daxFileName);
        }
        catch( Exception e){
            throw new RuntimeException( e );
        }

        //try to get the version number
        //of the dax
        mDaxSchemaVersion = getVersionOfDAX( daxFileName );
        mLogger.log( "DAXParser2 Version of DAX as picked up from the DAX " + mDaxSchemaVersion,
                      LogManager.DEBUG_MESSAGE_LEVEL );
        String schemaLoc = getSchemaLocation();
        mLogger.log( "DAXParser2 Picking schema for DAX " + schemaLoc, LogManager.DEBUG_MESSAGE_LEVEL );
        String list = DAXParser2.SCHEMA_NAMESPACE + " " + schemaLoc;
        setSchemaLocations(list);

        //figure out whether to pick up the double negative flags or not
        mUseDoubleNegative = useDoubleNegative( mDaxSchemaVersion );
        mLogger.log( "DAXParser2 Picking up the dontTransfer and dontRegister flags " + mUseDoubleNegative,
                     LogManager.DEBUG_MESSAGE_LEVEL );

        mLogger.logEventStart( LoggingKeys.EVENT_PEGASUS_PARSE_DAX, LoggingKeys.DAX_ID, daxFileName );

        mCurrentJobSubInfo.condorUniverse = GridGateway.JOB_TYPE.compute.toString(); //default value


        try {
            mParser.parse(daxFileName);
        }
        catch (Exception e) {
            //if a locator error then
            String message = (mLocator == null) ?
                           "DAXParser2 While parsing the file " + daxFileName:
                           "DAXParser2 While parsing file " + mLocator.getSystemId() +
                           " at line " + mLocator.getLineNumber() +
                           " at column " + mLocator.getColumnNumber();
            throw new RuntimeException(message, e);
        }

        mLogger.logEventCompletion();
    }

    /**
     * Overriding the empty implementation provided by
     * DefaultHandler of ContentHandler. This receives the notification
     * from the sacks parser when start tag of an element comes
     */
    public void startElement(String uri, String local, String raw,
                             Attributes attrs) throws SAXException {

        //setting the command line option only if textContent > 0
        if (mTextContent.length() > 0) {
            mWholeCommandString = mWholeCommandString.concat(new String(
                mTextContent));
            //System.out.println("\n Text Content is:" + new String(mTextContent));
            //resetting the buffer
            mTextContent.setLength(0);
        }

        local = local.trim();
        //dealing with ADag tag
        if (local.equalsIgnoreCase("adag")) {
            handleAdagTagStart(local, attrs);
        }

        //deal with execution tag dax 3.0
        else if ( local.equalsIgnoreCase( "execution" ) ){
            handleExecutionTagStart( local, attrs );
        }
        
        //dealing with filename tags
        else if (local.equalsIgnoreCase("filename")) {
            handleFilenameTagStart(local, attrs);
        }

        //dealing with the uses tag July 18
        else if (local.equalsIgnoreCase("uses")) {
            handleUsesTagStart(local, attrs);
        }

        //dealing with pfn tag in the uses tag
        // for dax 3.0 schema
        else if ( local.equalsIgnoreCase( "pfn" ) ){
            handleUsesPFNTagStart( local, attrs );
        }
        
        //dealing with the job tags
        else if (local.equalsIgnoreCase("job")) {
            handleJobTagStart(local, attrs);
        }

        //dealing with the dax tags for DAX 3.0
        else if (local.equalsIgnoreCase("dax")) {
            handleDAXTagStart(local, attrs);
        }
        
        //dealing with the dag tags for DAX 3.0
        else if (local.equalsIgnoreCase("dag")) {
            handleDAGTagStart(local, attrs);
        }
        
        //dealing with metadata tags for DAX 3.0
        else if (local.equalsIgnoreCase("metadata")) {
            handleMetadataTagStart(local, attrs);
        }
        
        //dealing with profile tag
        else if (local.equalsIgnoreCase("profile")) {
            handleProfileTagStart(local, attrs);
        }

        //dealing with the making of parent child relationship pairs
        else if (local.equalsIgnoreCase("child")) {
            handleChildTagStart(local, attrs);
        }
        else if (local.equalsIgnoreCase("parent")) {
            handleParentTagStart(local, attrs);
        }

        //dealing with the start of argument tag
        else if (local.equalsIgnoreCase("argument")) {
            handleArgumentTagStart(local, attrs);
        }

        //dealing with stdout for current job
        else if (local.equalsIgnoreCase("stdout")) {
            handleStdoutTagStart(local, attrs);
        }

        //dealing with stdin for current job
        else if (local.equalsIgnoreCase("stdin")) {
            handleStdinTagStart(local, attrs);
        }

        //dealing with stderr for current job
        else if (local.equalsIgnoreCase("stderr")) {
            handleStderrTagStart(local, attrs);
        }

    }

    /**
     * A convenience method that tries to determine the version of the dax
     * schema by reading ahead in the DAX file, and searching for
     * the version attribue in the file.
     *
     * @param file the name of the dax file.
     */
    public String getVersionOfDAX(String file){
        String schema = getSchemaOfDocument(file);
        return extractVersionFromSchema(schema);

    }

    /**
     * Determines the version of the DAX as specified in a schema string.
     *
     * @param schema   the schema string as specified in the root element of
     *                 the DAX.
     *
     * @return  the version.
     */
    public String extractVersionFromSchema(String schema){
        String token = null;
        String version = null;

        if(schema == null)
            return null;

        StringTokenizer st = new StringTokenizer(schema);
        while(st.hasMoreTokens()){
            token = st.nextToken();
            if(token.endsWith(".xsd")){
                //we got our match
                String name = new File(token).getName();
                int p1 = name.indexOf("dax-");
                int p2 = name.lastIndexOf(".xsd");
                //extract the version number
                version = (( p1 > -1) && (p2 > -1))?name.substring(p1+4,p2):null;
                return version;

            }
        }

        mLogger.log("Could not find the version number in DAX schema name",
                    LogManager.WARNING_MESSAGE_LEVEL);
        return version;

    }

    /**
     * A convenience method that tries to get the name of the schema the document
     * refers to. It returns the value of the xsi:schemaLocation.
     *
     * @param file the name of the dax file.
     */
    public String getSchemaOfDocument(String file){
        StringTokenizer st = null;
        String key = null;
        String value = null;

        try{
            BufferedReader in = new BufferedReader(new FileReader(file));
            String line = null;
            int p1 , p2 , c = 0;

            while ( (line = (in.readLine()).trim()) != null) {

                if(c == 0){
                    //try to check if it is an xml file
                    if ( ( (p1 = line.indexOf("<?xml")) > -1) &&
                         ( (p2 = line.indexOf("?>", p1)) > -1) ) {
                            //xml file is valid.
                            c++;

                    }
                    else{
                        //throw a exception
                        throw new java.lang.RuntimeException("Dax File is not xml " + file);
                    }

                }
                else{
                    if( (p1 = line.indexOf("<adag")) > -1){
                        line = line.substring(p1 + "<adag".length());
                        c++;

                    }
                    else{
                        if(c < 2)
                            //goto next line
                            continue;
                    }

                    st = new StringTokenizer(line,"= \"");
                    while(st.hasMoreTokens()){
                        c++;
                        if(c%2 == 1){
                            key = st.nextToken().trim();
                        }
                        else{
                            if(key.equalsIgnoreCase("xsi:schemaLocation")){
                                value = st.nextToken("=\"");
                                return value;
                            }
                            else{
                                value = st.nextToken();
                            }


                        }

                    }
                }
            }
        }
        catch(java.io.IOException e){
            mLogger.log("Parsing the dax file for version number " + " :" +
                        e.getMessage(),LogManager.ERROR_MESSAGE_LEVEL);

        }
        return null;

    }


    /**
     * Invoked when the starting of the adag element is got.
     * Information received is
     * name :  the name of the ADag
     * count:  Chimera can generate multiple abstract dags for a request.
     * index:  what is the index of the ADag being passed. Should
     *         vary between 0 and count - 1.
     */
    private void handleAdagTagStart(String local, Attributes attrs) {
        HashMap mp = new HashMap();
        String key;
        String value;

        for(int i = 0; i < attrs.getLength(); i++){
            key = attrs.getLocalName(i);
            value = attrs.getValue(i);
            //should probably check for valid attributes before setting
            mp.put(key,value);
            //System.out.println(key + " --> " + value);
        }
        //call the callback interface
        mCallback.cbDocument(mp);




    }

    /**
     * Replaces the keys associated with the execution tag, with the corresponding
     * keys in the hints profile namespace
     * 
     * @param local  the local name of the leemnt
     * @param attrs  the attributes
     */
    private void handleExecutionTagStart( String local, Attributes attrs ) {
        String key =  attrs.getValue("key");
        mProfileKey = executionToHintsNamespace( key );
        if( mProfileKey == null ){
            throw new RuntimeException( "Invalid key associated with execution tag " + key );
        }
        
        mNamespace = Hints.NAMESPACE_NAME;
        mProfileTag = true;     
    }


    /**
     * Invoked when the starting of the filename element is got.
     */
    private void handleFilenameTagStart(String local, Attributes attrs) {
        String linkType = new String(); //holds the link info about a logical file corr to a job
        String fileName = new String();
        String isTemp = new String();

        fileName = attrs.getValue("", "file").trim();

        PegasusFile pf = new PegasusFile(fileName);

        if (!infoAboutAllFilesRecv) {
            //this means we are dealing with filename tags in
            //the starting of the dax. These tags
            //contain the linkage information
            //logicalFilesInADag.addElement(fileName);

            
        }
        else if (mArgumentTag) {
            //means that the filename tag is nested in
            //an argument tag. Since dax 1.6
            //no linkage information comes
            //in this.
            mWholeCommandString = mAdjFName?
                                  //as per the default behaviour adding
                                  //a whitespace between two adjacent
                                  //filename tags
                                  mWholeCommandString  + " " + fileName:
                                  //else doing a simple concatenation
                                  mWholeCommandString  + fileName;

            mAdjFName     = true;
        }
        //dealing with profile tags
        else if (mProfileTag) { //means that filename tag is nested in a profile tag
            fileName = attrs.getValue("", "file");

            //an extra check
            if (mNamespace.equalsIgnoreCase("env")) {
                
                //namespace class member variables removed
                //mEnvNS.checkKeyInNS(mProfileKey,fileName);
                mCurrentJobSubInfo.envVariables.checkKeyInNS( mProfileKey,fileName );
            }
        }

    } //end of dealing with fileName tags in argument tag

    /**
     * Metadata parsing is ignored for time being.
     * 
     * @param local
     * @param attrs
     */
    private void handleMetadataTagStart(String local, Attributes attrs) {
        mLogger.log( "metadata element parsing is ignored for job " + mCurrentJobSubInfo.getID(),
                      LogManager.DEBUG_MESSAGE_LEVEL );
    }
    
    /**
     * Resets the text content buffer
     */
    private void handleMetadataTagEnd() {
        //reset buffer
        mTextContent.setLength( 0 );
                
    }
    
    /**
     * Invoked when the starting of the uses element is got. Uses tag is used to
     * denote all the files a particular job uses, be it as input , output or
     * silent.
     */
    private void handleUsesTagStart(String local, Attributes attrs) {
        String fileName = attrs.getValue("", "file");
        String linkType = attrs.getValue("", "link");
        String isTemp   = attrs.getValue("", "isTemporary");
        String type     = attrs.getValue("", "type");
        String size     = attrs.getValue("", "size" );
        
        mUsesLinkType = linkType;
        
        //since dax 1.6, the isTemporary
        //is broken into two transient
        //attributes dontTransfer and dontRegister

        //pick up the registration flag
        boolean register = ( mUseDoubleNegative )?
                           //pick up the dR flag
                           !new Boolean( attrs.getValue( "", "dontRegister" ) ).booleanValue():
                           //pick up the register flag
                           new Boolean( attrs.getValue( "", "register" ) ).booleanValue();

        //boolean dontRegister = new Boolean(attrs.getValue("","dontRegister")).booleanValue();

        //notion of optional file since dax 1.8
        boolean optionalFile = new Boolean( attrs.getValue( "", "optional" ) ).booleanValue();

        //value of dontTransfer is tri state (true,false,optional) since dax 1.7
        String transfer = ( mUseDoubleNegative )?
                          attrs.getValue( "", "dontTransfer" ):
                          attrs.getValue( "" , "transfer" );
        //String dontTransfer = attrs.getValue("","dontTransfer");
        PegasusFile pf = new PegasusFile(fileName);

        //handling the transient file feature
        if (isTemp != null) {
            //this for dax 1.5 handling
            boolean temp = new Boolean(isTemp.trim()).booleanValue();
            if (temp) {
                //set the transient flags
                pf.setTransferFlag(PegasusFile.TRANSFER_NOT);
                register = false;
            }
        }
        else{
            //set the transfer mode for the file
            //for dax 1.5 onwards
            pf.setTransferFlag( transfer, mUseDoubleNegative );
        }
        //handling the dR flag
//        if( !register ){
//            pf.setTransientRegFlag();
//        }
        pf.setRegisterFlag( register );
            
        //handling the optional attribute
        if(optionalFile)
            pf.setFileOptional();

        //handle type of file
        if( type != null )
            pf.setType( type );

        //handle the size of file
        pf.setSize( size );
        
        //store for later reference in endUses method
        mUsesPegasusFile = pf;
   
    }
    
   
    /**
     * Handles the end of a uses tag.
     */
    private void handleUsesTagEnd(){
        
        if ( mUsesLinkType.trim().equalsIgnoreCase("input")) {
            mVJobInpFiles.add( mUsesPegasusFile );
        }
        else if ( mUsesLinkType.trim().equalsIgnoreCase("output")) {
            mVJobOutFiles.add( mUsesPegasusFile );
            //the notion of an optional file as an output would mean it
            //has the optional transfer flag set.
            if( mUsesPegasusFile.fileOptional() &&
                mUsesPegasusFile.getTransferFlag() == PegasusFile.TRANSFER_MANDATORY){
                //update the transfer flag to optional
                mUsesPegasusFile.setTransferFlag(PegasusFile.TRANSFER_OPTIONAL);
            }
        }
        else if ( mUsesLinkType.trim().equalsIgnoreCase("inout")) {
            mVJobInpFiles.add( mUsesPegasusFile );
            mVJobOutFiles.add( mUsesPegasusFile );
        }
        
        //reset the tracking variables
        mUsesPegasusFile = null;
        mUsesLinkType    = null;
    }

    
    /**
     * Invoked when start of the pfn element nested in uses element is encountered
     * 
     * @param local      the local name of the element
     * @param attrs  the map of attributes and values in the element tag
     */
    private void handleUsesPFNTagStart( String local, Attributes attrs ){
        String url  = attrs.getValue( "", "url" );
        String site = attrs.getValue( "", "site" );
        
        //convert the existing PegasusFile object to it's physical
        //mapping , FileTransfer object
       /*        FileTransfer ft = new FileTransfer( mUsesPegasusFile );
        //the linkage type determines whether it is input or source
        if ( mUsesLinkType.trim().equalsIgnoreCase("input")) {
            ft.addSource( site, url );
        }
        else{
            ft.addDestination( site, url );
        }
        mUsesPegasusFile = ft;
       */

        if ( mUsesLinkType.trim().equalsIgnoreCase("input")) {
            //create a new replica catalog entry
            //only for input files. we dont care about output file pfn's
            ReplicaLocation rl = new ReplicaLocation( );
            rl.setLFN( this.mUsesPegasusFile.getLFN() );
            ReplicaCatalogEntry rce = new ReplicaCatalogEntry( );
            //site = ( site == null || site.length() == 0 )? "unknown" : site;
            rce.setResourceHandle( site );
            rce.setPFN( url );
            rl.addPFN( rce );
            this.mCallback.cbFile(rl);
        }
    }
 
    /**
     * Invoked when the starting of the dax element is retrieved. The
     * DAG element  extends on the job element.
     * 
     * 
     * @param local  the local name of the element
     * @param attrs  the attributes
     */
    private void handleDAGTagStart( String local, Attributes attrs ) {
        mCurrentJobSubInfo = new DAGJob();
        
        //the job should be tagged type pegasus
        
        //the job should always execute on local site
        //for time being
        mCurrentJobSubInfo.hints.construct( Hints.EXECUTION_POOL_KEY, "local" );
        
        //also set the executable to be used
        mCurrentJobSubInfo.hints.construct( Hints.PFN_HINT_KEY, "/opt/condor/bin/condor-dagman" );
        
        //retrieve the extra attribute about the DAX
        mDAGLFN = attrs.getValue("", "file");
        ((DAGJob)mCurrentJobSubInfo).setDAGLFN( mDAGLFN );
        
        //add default name and namespace information
        mCurrentJobSubInfo.setTransformation( "condor",
                                              "dagman",
                                              null );
        
        
        mCurrentJobSubInfo.setDerivation( "condor",
                                          "dagman",
                                           null );
                             
        mCurrentJobSubInfo.level       = (attrs.getValue("","level") == null) ?
                                         -1:
                                         Integer.parseInt(attrs.getValue("","level"));
                                         
        mCurrentJobSubInfo.setLogicalID( attrs.getValue("", "id") );
        
        mCurrentJobSubInfo.vdsNS.construct( Pegasus.GRIDSTART_KEY,
                            GridStartFactory.GRIDSTART_SHORT_NAMES[GridStartFactory.NO_GRIDSTART_INDEX] );

        
        handleJobTagStart( mCurrentJobSubInfo  );
        mCurrentJobSubInfo.setName( ((DAGJob)mCurrentJobSubInfo).generateName( this.mJobPrefix) );
    }
    
    /**
     * Invoked when the starting of the dax element is retrieved. The
     * DAX element is a extends on the job element.
     * 
     * 
     * @param local  the local name of the element
     * @param attrs  the attributes
     */
    private void handleDAXTagStart( String local, Attributes attrs ) {
        mCurrentJobSubInfo = new DAXJob();
        
        //the job should be tagged type pegasus
        mCurrentJobSubInfo.setTypeRecursive();
        
        //the job should always execute on local site
        //for time being
        mCurrentJobSubInfo.hints.construct( Hints.EXECUTION_POOL_KEY, "local" );
        
        //also set the executable to be used
        mCurrentJobSubInfo.hints.construct( Hints.PFN_HINT_KEY, "/tmp/pegasus-plan" );
        
        //retrieve the extra attribute about the DAX
        mDAXLFN = attrs.getValue("", "file");
        ((DAXJob)mCurrentJobSubInfo).setDAXLFN( mDAXLFN );
        
        //add default name and namespace information
        mCurrentJobSubInfo.setTransformation( "pegasus",
                                              "pegasus-plan",
                                              Version.instance().toString() );
        
        
        mCurrentJobSubInfo.setDerivation( "pegasus",
                                          "pegasus-plan",
                                          Version.instance().toString() );
                             
        mCurrentJobSubInfo.level       = (attrs.getValue("","level") == null) ?
                                         -1:
                                         Integer.parseInt(attrs.getValue("","level"));
                                         
        mCurrentJobSubInfo.setLogicalID( attrs.getValue("", "id") );
        
        handleJobTagStart( mCurrentJobSubInfo  );
        
        mCurrentJobSubInfo.setName( ((DAXJob)mCurrentJobSubInfo).generateName( this.mJobPrefix) );
    }
    
    /**
     * Invoked when the starting of the job element is got. The following
     * information is retrieved from the tag
     *
     * name      : name of the job, which is the logical name of the job.
     * namespace : the namespace with which the transformation corresponding to
     *             the job is associated.
     * version   : the version of the transformation.
     * 
     * @param local  the local name of the element
     * @param attrs  the attributes
     */
    private void handleJobTagStart( String local, Attributes attrs ) {
        mCurrentJobSubInfo = new Job();
        
        mCurrentJobSubInfo.namespace   = attrs.getValue("", "namespace");
        mCurrentJobSubInfo.logicalName = attrs.getValue("", "name");
        mCurrentJobSubInfo.version     = attrs.getValue("", "version");
        mCurrentJobSubInfo.dvName      = attrs.getValue("", "dv-name");
        mCurrentJobSubInfo.dvNamespace = attrs.getValue("","dv-namespace");
        mCurrentJobSubInfo.dvVersion   = attrs.getValue("","dv-version");
        mCurrentJobSubInfo.level       = (attrs.getValue("","level") == null) ?
                                         -1:
                                         Integer.parseInt(attrs.getValue("","level"));
        
        
        mCurrentJobSubInfo.setLogicalID( attrs.getValue("", "id") );
        mCurrentJobSubInfo.setRuntime( attrs.getValue("","runtime") );
        
        handleJobTagStart( mCurrentJobSubInfo );
    }
    
    /**
     * Invoked when the starting of the job element is got. The following
     * information is retrieved from the tag
     *
     * name      : name of the job, which is the logical name of the job.
     * namespace : the namespace with which the transformation corresponding to
     *             the job is associated.
     * version   : the version of the transformation.
     * 
     * @param job    the <code>Job</code> object
     */
    private void handleJobTagStart( Job job ) {
        String jobId = job.getLogicalID();


        job.condorUniverse = GridGateway.JOB_TYPE.compute.toString();
        
        infoAboutAllFilesRecv = true;

        mLogger.log( "Parsing job with logical id " + job.getLogicalID(),
                     LogManager.DEBUG_MESSAGE_LEVEL );

        //mvJobIds.addElement(jobId);

        String jobName = job.logicalName;
        
        //construct the jobname/primary key for job
        StringBuffer name = new StringBuffer();

        //prepend a job prefix to job if required
        if( mJobPrefix != null ){
            name.append( mJobPrefix );
        }

        //append the name and id recevied from dax
        name.append( jobName );
        name.append( "_" );
        name.append( jobId );

        job.setName( name.toString() );

    }

    /**
     * Invoked when the end of the dag tag is reached.
     * 
     * It removes the dag file referred in the element.
     */
    private void handleDAGTagEnd() {
        /*
        //Moved to Transfer Engine
        String dag = null;
        //go through all the job input files
        //and set transfer flag to false
        for( Iterator<PegasusFile> it = mVJobInpFiles.iterator(); it.hasNext(); ){
            PegasusFile pf = it.next();
            if( pf.getLFN().equals( mDAGLFN  )){
                
                //retrieve the source url 
                if ( pf instanceof FileTransfer ){
                      dag = ((FileTransfer)pf).getSourceURL().getValue();
                }
                              
                //at the moment dax files are not staged in.
                //remove from input set of files
                it.remove();
            }
        }
        
        if( dag == null ){
            throw new RuntimeException( "Path to DAG file not specified in DAX for job " + 
                                        mCurrentJobSubInfo.getLogicalID() );
        }
        ((DAGJob)mCurrentJobSubInfo).setDAGFile( dag );
        //set the directory if specified
        ((DAGJob)mCurrentJobSubInfo).setDirectory(
                (String)mCurrentJobSubInfo.dagmanVariables.removeKey( Dagman.DIRECTORY_EXTERNAL_KEY ));

        */
        
        handleJobTagEnd();
        
    }
    
    /**
     * Invoked when the end of the job tag is reached.
     */
    private void handleDAXTagEnd() {
        /*
        //Moved to Transfer Engine

        //determine the dax input file and specify
        //the path in the argument string for now.
        String dax = mDAXLFN;
        for( Iterator<PegasusFile> it = mVJobInpFiles.iterator(); it.hasNext(); ){
            PegasusFile pf = it.next();
            if( pf.getLFN().equals( mDAXLFN  )){
                
                //retrieve the source url 
                if ( pf instanceof FileTransfer ){
                      dax = ((FileTransfer)pf).getSourceURL().getValue();
                }
                              
                //at the moment dax files are not staged in.
                //remove from input set of files
                it.remove();
            }
        }
        
        //add the dax to the argument
        StringBuffer arguments = new StringBuffer();
        arguments.append( mCurrentJobSubInfo.getArguments() ).
                  append( " --dax ").append( dax );
        mCurrentJobSubInfo.setArguments( arguments.toString() );
        */
        handleJobTagEnd();
        
    }
    
    /**
     * Invoked when the end of the job tag is reached.
     */
    private void handleJobTagEnd() {
        //adding the information about the job to mCurrentJobSubInfo
        mCurrentJobSubInfo.setInputFiles(  mVJobInpFiles );
        mCurrentJobSubInfo.setOutputFiles( mVJobOutFiles );

        
        //The job id for the compute jobs
        //is the name of the job itself.
        //All the jobs in the DAX are
        //compute jobs
        mCurrentJobSubInfo.jobClass = Job.COMPUTE_JOB;
        mCurrentJobSubInfo.jobID = mCurrentJobSubInfo.jobName;
 
        
        
        //send the job to the appropriate callback implementing class
        mCallback.cbJob(mCurrentJobSubInfo);
        
        mVJobInpFiles = new HashSet();
        mVJobOutFiles = new HashSet();
    }

    /**
     * Invoked when the starting of the profile element is got.
     */
    private void handleProfileTagStart(String local, Attributes attrs) {
        mProfileKey = attrs.getValue("key");
        mNamespace = attrs.getValue("namespace");
        mProfileTag = true;
    }


   /**
     * Invoked when the end of the execution element is reached.
     */
    private void handleExecutionTagEnd() {
        handleProfileTagEnd();

        //check if we an executable path is specified
        if( this.mCurrentJobSubInfo.hints.containsKey( Hints.PFN_HINT_KEY ) ){
            TransformationCatalogEntry entry = this.constructTCEntryFromJobHints(mCurrentJobSubInfo);
            this.mCallback.cbExecutable( entry );
        }
    }

    /**
     * Constructs a TC entry object from the contents of a job.
     * The architecture assigned to this entry is default ( INTEL32::LINUX )
     * and resource id is set to unknown.
     *
     * @param job  the job object
     *
     * @return constructed TransformationCatalogEntry
     */
    private TransformationCatalogEntry constructTCEntryFromJobHints( Job job ){
        String executable = (String) job.hints.get( Hints.PFN_HINT_KEY );
        TransformationCatalogEntry entry = new TransformationCatalogEntry();
        entry.setLogicalTransformation(job.getTXNamespace(), job.getTXName(), job.getTXVersion());
        entry.setResourceId( "unknown" );
        entry.setVDSSysInfo( new VDSSysInfo( Arch.INTEL64, Os.LINUX, "", "" ) );
        entry.setPhysicalTransformation( executable );
        //hack to determine whether an executable is
        //installed or static binary
        entry.setType( executable.startsWith("/") ?
                            TCType.INSTALLED :
                            TCType.STAGEABLE );

        return entry;
    }



    /**
     * Invoked when the end of the profile element is got.
     *
     * Here we handle all the namespaces supported by Chimera at present.
     */
    private void handleProfileTagEnd() {
        mProfileTag = false;

        //setting the command line option only if textContent > 0
        if (mTextContent.length() > 0) {

            //check if namespace is valid
            mNamespace = mNamespace.toLowerCase();
            if(!Namespace.isNamespaceValid(mNamespace)){
                //reset buffer
                mTextContent.setLength( 0 );
                mLogger.log("Namespace specified in the DAX not supported. ignoring "+ mNamespace,
                            LogManager.WARNING_MESSAGE_LEVEL);
                return;
            }

            String value = mTextContent.toString().trim();
            switch(mNamespace.charAt(0)){

                case 'c'://condor
                    mCurrentJobSubInfo.condorVariables.checkKeyInNS( mProfileKey, value );
                    break;

                case 'd'://dagman
                    mCurrentJobSubInfo.dagmanVariables.checkKeyInNS(mProfileKey, value );
                    break;

                case 'e'://env
                    mCurrentJobSubInfo.envVariables.checkKeyInNS(mProfileKey, value );
                    break;

                case 'g'://globus
                    mCurrentJobSubInfo.globusRSL.checkKeyInNS(mProfileKey, value );
                    break;

                case 'h'://hint
                    mCurrentJobSubInfo.hints.checkKeyInNS(mProfileKey, value );
                    break;

                case 'p'://pegasus
                    mCurrentJobSubInfo.vdsNS.checkKeyInNS(mProfileKey, value );
                    break;

                default:
                    //ignore should not come here ever.
                    mLogger.log("Namespace not supported. ignoring "+ mNamespace,
                            LogManager.WARNING_MESSAGE_LEVEL);
                    break;

            }


            //resetting the buffer
            mTextContent.setLength(0);
            mProfileKey = "";
            mNamespace = "";

        }
    }

    /**
     * Invoked when the starting of the child element is got. The child element
     * gives us the child of an edge of the dag. The edge being parent->child.
     */
    private void handleChildTagStart(String local, Attributes attrs) {
        mCurrentChildId = "";
        mCurrentChildId = attrs.getValue("", "ref");
        mParents        = new LinkedList();
    }

    /**
     * This passes the child and it's parents list to the callback object.
     */
    private void handleChildTagEnd(){
        //String childName = lookupName(mCurrentChildId);
        mCallback.cbParents(mCurrentChildId,mParents);
    }

    /**
     * Invoked when the starting of the parent element is got. The child element
     * gives us the child of an edge of the dag. The edge being parent->child.
     */
    private void handleParentTagStart(String local, Attributes attrs) {
        //stores the child parent Relation
        

        String parentId = attrs.getValue("", "ref");

        //looking up the parent name
        //parentName = lookupName(parentId);
//        mParents.add(parentId);
        mParents.add(  new PCRelation( parentId, this.mCurrentChildId ));
    }

    /**
     * Invoked when the starting of the Argument Tag is reached. Just set a
     * boolean variable
     */
    private void handleArgumentTagStart(String local, Attributes attrs) {
        //setting the boolean variable.
        mArgumentTag = true;
        //set the adjacency flag for
        //adjacent filename to false
        mAdjFName     = false;
    }

    
    /**
     * Invoked when the end of the Argument Tag is reached.
     *
     * The buffers are reset
     */
    private void handleArgumentTagEnd() {
        mArgumentTag = false;
        mWholeCommandString = mWholeCommandString.concat(new String(
            mTextContent));

        mWholeCommandString = this.ignoreWhitespace(mWholeCommandString);
        //adding the commmand string
        mCurrentJobSubInfo.strargs = new String(mWholeCommandString);

        //resetting mWholeCommandString
        mWholeCommandString = "";

        //resetting the buffer
        mTextContent.setLength(0);
        //System.out.println( "Argument is " + mCurrentJobSubInfo.getArguments() );

    }
    
    /**
     * Our own implementation for ignorable whitespace. A String that holds the
     * contents of data passed as text by the underlying parser. The whitespaces
     * at the end are replaced by one whitespace.
     *
     * @param str   The string that contains whitespaces.
     *
     * @return  String corresponding to the trimmed version.
     *
     */
    public String ignoreWhitespace(String str){
        return ignoreWhitespace( str, mProps.preserveParserLineBreaks() );
    }

    /**
     * Invoked when the starting of the stdout tag is reached.
     * Used to specify the stdout of the application by the user. It can be
     * a file also.
     */
    private void handleStdoutTagStart(String local, Attributes attrs) {
        mCurrentJobSubInfo.stdOut = attrs.getValue("", "file");
    }

    /**
     * Invoked when the starting of the stdin tag is reached.
     * Used to specify the stdout of the application by the user. It can be
     * a file also.
     */
    private void handleStdinTagStart(String local, Attributes attrs) {
        mCurrentJobSubInfo.stdIn = attrs.getValue("", "file");
    }

    /**
     * Invoked when the starting of the stdout tag is reached.
     * Used to specify the stderr of the application by the user. It can be
     * a file also.
     */
    private void handleStderrTagStart(String local, Attributes attrs) {
        mCurrentJobSubInfo.stdErr = attrs.getValue("", "file");
    }


    /**
     * Overrides the default implementation when the elements end tag comes.
     * This method is called automatically by the Sax parser when the end tag of
     * an element comes in the xml file.
     */

    public void endElement(String uri, String localName, String qName) {
        /*System.out.println("element end tag ---------");
                 System.out.println("line number "+ locator.getLineNumber());
                 System.out.println("URI: "+ uri);
                 System.out.println("local name " + localName);
                 System.out.println("qname: "+qName);*/

        boolean temp = true;
        String universe = GridGateway.JOB_TYPE.compute.toString(); //by default jobs are vanilla

        //when we get the end tag of argument, we change reset the currentCommOpt
        if (localName.equals("argument")) { // || localName.trim().equalsIgnoreCase("job")){
            handleArgumentTagEnd();
        }
        else if (localName.equals( "execution" )) {
            handleExecutionTagEnd();
        }
        else if (localName.equals("job")) {
            handleJobTagEnd();
        }
        else if (localName.equals("dax")) {
            handleDAXTagEnd();
        }
        else if (localName.equals("dag")) {
            handleDAGTagEnd();
        }
        else if (localName.equals("metadata")) {
            handleMetadataTagEnd();
        }
        else if (localName.equals("profile")) {
            handleProfileTagEnd();
        }
        else if (localName.equals("child")) {
            handleChildTagEnd();
        }
        else if(localName.equals("adag")){
            //call the callback interface
            mCallback.cbDone();
            return;
        }
        else if( localName.equals( "uses" ) ){
            handleUsesTagEnd();
        }
   }

    /**
     *  Here we have all the elements in our data structure. This is called
     *  automatically when the end of the XML file is reached.
     */
    public void endDocument() {
        
    }

    /**
     * The main program. The DAXParser2 can be run standalone, by which it just
     * parses the file and populates the required data objects.
     *
     */

    public static void main(String args[]) {
        //System.setProperty("vds.home","/nfs/asd2/vahi/test/chimera/");
        //DAXParser2 d = new DAXParser2("sdss.xml","isi",null);
        //DAXParser2 d = new DAXParser2("sonal.xml",new DAX2CDAG("./sonal.xml"));
        //DAXParser2 d = new DAXParser2("./testcases/black-diamond/blackdiamond_dax_1.7.xml");
        //DAXParser2 d = new DAXParser2("/nfs/asd2/vahi/gurmeet_dax.xml");

        /*DagInfo dagInfo = d.getDagInfo();

        Vector vSubInfo = d.getSubInfo();

        ADag adag = new ADag(dagInfo, vSubInfo);

        System.out.println(adag);
        */

    }

    /**
     * Returns the XML schema namespace that a document being parsed conforms
     * to.
     *
     * @return the schema namespace
     */
    public  String getSchemaNamespace( ){
        return DAXParser2.SCHEMA_NAMESPACE;
    }


    /**
     * Helps the load database to locate the DAX XML schema, if available.
     * Please note that the schema location URL in the instance document
     * is only a hint, and may be overriden by the findings of this method.
     *
     * @return a location pointing to a definition document of the XML
     * schema that can read VDLx. Result may be null, if such a document
     * is unknown or unspecified.
     */
    public String getSchemaLocation() {
        // treat URI as File, yes, I know - I need the basename
        File uri = new File(DAXParser2.SCHEMA_LOCATION);

        //get the default version with decimal point shifted right
        float defaultVersion = shiftRight( extractVersionFromSchema( uri.getName() ) );

        float schemaVersion = shiftRight( mDaxSchemaVersion );


        String child = ( schemaVersion == -1 || schemaVersion > defaultVersion)?
                       //use the default
                       uri.getName():
                       //use the schema version specified in the dax
                       "dax-" + mDaxSchemaVersion + ".xsd";

        // create a pointer to the default local position
        File dax = new File(this.mProps.getSchemaDir(), child);

        //System.out.println("\nDefault Location of Dax is " + dax.getAbsolutePath());

        // Nota bene: vds.schema.dax may be a networked URI...
        return this.mProps.getDAXSchemaLocation(dax.getAbsolutePath());
    }


    /**
     * Determines whether to use a doubleNegative or not.
     *
     * @param daxVersion the version of the dax as determined.
     *
     * @return boolean
     */
    protected boolean useDoubleNegative( String daxVersion ){
        float current =  shiftRight( daxVersion );
        boolean result = false;
        //sanity check
        if( current == -1 ){
            //we were unable to parse the dax version
            //means we assume double negative is turned off
            return result;
        }

        float base = shiftRight( this.DAX_VERSION_WITHOUT_DOUBLE_NEGATIVE );

        //we turned off double negative after >= base
        return base > current;
    }

    /**
     * Returns a float with the decimal point shifted right till the end.
     * Is necessary for comparing a String "1.10" with a String "1.9".
     *
     * @param value  the value that has to be shifted right.
     *
     * @return the float value, with the decimal point shifted or -1 in case
     *         of error.
     */
    public float shiftRight(String value){
        float result = -1;

        //sanity check in case of null value
        if( value == null ) return result;

        value = value.trim();
        int i = 0;
        for( i = 0; i < value.length(); i++){
            char c = value.charAt(i);

            //parse till the first '.'
            if ( c >= '0' && c <= '9' ) {
                continue;
            }
            else if ( c == '.' ) {
                i++;
                break;
            }
            else{
                //invalid string
                return result;
            }
        }

        //determine the multiplicative factor
        int factor = 1;
        for ( i = i ; i < value.length(); i++, factor *= 10 ){
            char c = value.charAt(i);

            //exit if any of the trailing characters are non digits
            if ( ! ( c >= '0' && c <= '9') ) return result;
        }

        result = Float.parseFloat(value) * factor;

        return result;
    }

}

